/**
 * Copyright (C) 2010, 2011 Neofonie GmbH
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package eu.dicodeproject.analysis.wordcount;

import eu.dicodeproject.analysis.lucene.IterableAnalyzer;
import eu.dicodeproject.analysis.lucene.CleansingAnalyzer;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.lucene.analysis.Analyzer;

import java.io.IOException;
import java.util.*;

/**
 * Emits words with count 1, using a Lucene Tokenizer.
 */
public class WordCountMapper extends TableMapper<Text, IntWritable> {

  private String dateWordSeparator = "_";
  private String targetWordsConfKey = "targetWords";

  /** Reusable data objects. */
  private Text term = new Text();
  private static IntWritable one = new IntWritable(1);

  /** Calendar to transform longs to dates */
  private Calendar cal = Calendar.getInstance();

  /** Analyzer for target words and tweet text. */
  private Analyzer cleansingAnalyzer = new CleansingAnalyzer();

  @Override
  protected void map(ImmutableBytesWritable row, Result values, Context context) throws IOException,
      InterruptedException {

    // get target words from configuration
    String tw = context.getConfiguration().get(targetWordsConfKey);

    // make a set of analyzed target words
    Set<String> targetWords = new HashSet<String>();
    for (String t : new IterableAnalyzer(cleansingAnalyzer, tw)) {
      targetWords.add(t);
    }

    // result date and found words
    String date = "";
    List<String> foundWords = new LinkedList<String>();

    // collect date and words from this table row
    for (KeyValue keyValue : values.list()) {
      if (isCreationDateColumn(keyValue)) {
        date = getDate(keyValue);
      } else {
        addWords(targetWords, keyValue, foundWords);
      }
    }

    // emit with date and word merged as key and 1 in value
    for (String fw : foundWords) {
      term.set(date+dateWordSeparator+fw);
      context.write(term, one);
    }
  }

  /**
   * Check if this column contains creation date data.
   */
  private byte[] creationDateBytes = Bytes.toBytes("creationDate");
  private boolean isCreationDateColumn(KeyValue kv) {
    return Arrays.equals(kv.getQualifier(), creationDateBytes);
  }

  /**
   * Extract the date in the form YYYY-MM-DD from a column cell.
   * @param kv table cell containing a date as long
   * @return date in the form YYYY-MM-D
   */
  private String getDate(KeyValue kv) {
    long creationDateLong = Bytes.toLong(kv.getValue());
    cal.setTimeInMillis(creationDateLong);
    String year = String.valueOf(cal.get(Calendar.YEAR));
    String month = String.valueOf(cal.get(Calendar.MONTH) + 1); // Calendar.get returns 0 for January
    if (month.length() == 1) {
      month = "0"+month;
    }
    String day = String.valueOf(cal.get(Calendar.DAY_OF_MONTH));
    if (day.length() == 1) {
      day = "0"+day;
    }
    return year+"-"+month+"-"+day;
  }

  /**
   * Add words of a table cell to a list, if the word is in the target words set.
   * @param targetWords set of words that we are searching for
   * @param kv table cell
   * @param foundWords list of words that are targets and are in Tweet text
   * @throws IOException inherited from IterableAnalyzer constructor
   */
  private void addWords(Set<String> targetWords, KeyValue kv, List<String> foundWords) throws IOException {
    for (String t : new IterableAnalyzer(cleansingAnalyzer, new String(kv.getValue()))) {
      if (targetWords.contains(t)) {
        foundWords.add(t);
      }
    }
  }

}
